/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;

import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;

/**
 * only has one input column
 */
public class SingleColumnMerger implements ColumnMerger {

    private final InputLocation location;

    private final TimeComparator comparator;

    public SingleColumnMerger(InputLocation location, TimeComparator comparator) {
        this.location = location;
        this.comparator = comparator;
    }

    @Override
    public void mergeColumn(
            TsBlock[] inputTsBlocks,
            int[] inputIndex,
            int[] updatedInputIndex,
            TimeColumnBuilder timeBuilder,
            long currentEndTime,
            ColumnBuilder columnBuilder) {

        mergeOneColumn(
                inputTsBlocks,
                inputIndex,
                updatedInputIndex,
                timeBuilder,
                currentEndTime,
                columnBuilder,
                location,
                comparator);
    }

    public static void mergeOneColumn(
            TsBlock[] inputTsBlocks,
            int[] inputIndex,
            int[] updatedInputIndex,
            TimeColumnBuilder timeBuilder,
            long currentEndTime,
            ColumnBuilder columnBuilder,
            InputLocation location,
            TimeComparator comparator) {
        int tsBlockIndex = location.getTsBlockIndex();
        int columnIndex = location.getValueColumnIndex();

        int rowCount = timeBuilder.getPositionCount();
        int index = inputIndex[tsBlockIndex];
        // input column is empty or current time of input column is already larger than currentEndTime
        // just appendNull rowCount null
        if (ColumnMerger.empty(tsBlockIndex, inputTsBlocks, inputIndex)
                || !comparator.satisfyCurEndTime(
                inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
            columnBuilder.appendNull(rowCount);
        } else {
            // read from input column and write it into columnBuilder
            TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
            Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
            for (int i = 0; i < rowCount; i++) {
                // current index reaches the size of input column or current time of input column is already
                // larger than currentEndTime, use null column to fill the remaining
                if (timeColumn.getPositionCount() == index
                        || !comparator.satisfyCurEndTime(
                        inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
                    columnBuilder.appendNull(rowCount - i);
                    break;
                }
                // current time of input column is equal to result row's time
                if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
                    // if input column's value at index is null, append a null value
                    if (valueColumn.isNull(index)) {
                        columnBuilder.appendNull();
                    } else {
                        // if input column's value at index is not null, append the value
                        columnBuilder.write(valueColumn, index);
                    }
                    // increase the index
                    index++;
                } else {
                    // otherwise, append a null
                    columnBuilder.appendNull();
                }
            }
        }
        // update the index after merging
        updatedInputIndex[tsBlockIndex] = index;
    }

    @Override
    public void mergeColumn(
            TsBlock[] inputTsBlocks,
            int[] inputIndex,
            int[] updatedInputIndex,
            long currentTime,
            ColumnBuilder columnBuilder) {
        mergeOneColumn(
                inputTsBlocks, inputIndex, updatedInputIndex, currentTime, columnBuilder, location);
    }

    public static void mergeOneColumn(
            TsBlock[] inputTsBlocks,
            int[] inputIndex,
            int[] updatedInputIndex,
            long currentTime,
            ColumnBuilder columnBuilder,
            InputLocation location) {
        int tsBlockIndex = location.getTsBlockIndex();
        int columnIndex = location.getValueColumnIndex();

        int index = inputIndex[tsBlockIndex];
        // input column is empty or current time of input column is already larger than currentEndTime
        // just appendNull
        if (ColumnMerger.empty(tsBlockIndex, inputTsBlocks, inputIndex)
                || inputTsBlocks[tsBlockIndex].getTimeByIndex(index) != currentTime) {
            columnBuilder.appendNull();
        } else {
            // read from input column and write it into columnBuilder
            Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);

            if (valueColumn.isNull(index)) {
                columnBuilder.appendNull();
            } else {
                columnBuilder.write(valueColumn, index);
            }
            index++;
        }
        // update the index after merging
        updatedInputIndex[tsBlockIndex] = index;
    }
}
